PySpark SQL Exercises

1. Select Unique Records

        from pyspark.sql import SparkSession

        # Sample data
        data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Alice", 25)]
        df = spark.createDataFrame(data, ["id", "name", "age"])

        df.createOrReplaceTempView("people")

        # Select distinct names
        unique_names_df = spark.sql("SELECT DISTINCT name FROM people")

        # Show result
        unique_names_df.show()
    

2. Count Records

        # Count the number of rows in the DataFrame
        count_df = spark.sql("SELECT COUNT(*) as total_count FROM people")

        # Show result
        count_df.show()
    

3. Group By and Count

        # Group by name and count occurrences
        group_by_count_df = spark.sql("SELECT name, COUNT(*) as name_count FROM people GROUP BY name")

        # Show result
        group_by_count_df.show()
    

4. Group By with Aggregation (SUM)

        # Sample data
        data = [("Alice", 1000), ("Bob", 1500), ("Alice", 2000)]
        df2 = spark.createDataFrame(data, ["name", "salary"])

        df2.createOrReplaceTempView("salaries")

        # Group by name and sum salaries
        sum_salaries_df = spark.sql("SELECT name, SUM(salary) as total_salary FROM salaries GROUP BY name")

        # Show result
        sum_salaries_df.show()
    

5. Group By with Aggregation (Average)

        # Group by name and calculate average salary
        avg_salaries_df = spark.sql("SELECT name, AVG(salary) as avg_salary FROM salaries GROUP BY name")

        # Show result
        avg_salaries_df.show()
    

6. Filter Records with WHERE Clause

        # Filter records where salary is greater than 1200
        filter_df = spark.sql("SELECT * FROM salaries WHERE salary > 1200")

        # Show result
        filter_df.show()
    

7. Order Records by a Column

        # Order records by salary in descending order
        order_by_df = spark.sql("SELECT * FROM salaries ORDER BY salary DESC")

        # Show result
        order_by_df.show()
    


PySpark Coding Exercises

1. Count the Number of Occurrences of Each Word

        from pyspark.sql import SparkSession
        from pyspark.sql.functions import explode, split, col

        # Sample data
        data = [("Hello world",), ("Hello PySpark",), ("Spark is great",)]
        df = spark.createDataFrame(data, ["text"])

        # Split the text into words
        words_df = df.select(explode(split(col("text"), " ")).alias("word"))

        # Count occurrences of each word
        word_count_df = words_df.groupBy("word").count()

        # Show result
        word_count_df.show()
    

2. Filter Data Based on a Condition

        # Sample data
        data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
        df = spark.createDataFrame(data, ["id", "name", "age"])

        # Filter rows where age > 25
        filtered_df = df.filter(col("age") > 25)

        # Show result
        filtered_df.show()
    

3. Join Two DataFrames

        # Sample data for DataFrame 1
        data1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
        df1 = spark.createDataFrame(data1, ["id", "name"])

        # Sample data for DataFrame 2
        data2 = [(1, "HR"), (2, "Engineering"), (4, "Marketing")]
        df2 = spark.createDataFrame(data2, ["id", "department"])

        # Inner join on 'id'
        joined_df = df1.join(df2, on="id", how="inner")

        # Show result
        joined_df.show()
    

4. Group By and Aggregate Data

        from pyspark.sql.functions import avg

        # Sample data
        data = [("Alice", "HR", 25), ("Bob", "Engineering", 30), ("Cathy", "HR", 28)]
        df = spark.createDataFrame(data, ["name", "department", "age"])

        # Group by department and calculate average age
        avg_age_df = df.groupBy("department").agg(avg("age").alias("avg_age"))

        # Show result
        avg_age_df.show()
    

5. Create a UDF (User Defined Function)

        from pyspark.sql.functions import udf
        from pyspark.sql.types import StringType

        # Sample data
        data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
        df = spark.createDataFrame(data, ["id", "name"])

        # Define a UDF to add a prefix to a name
        def add_prefix(name):
            return "Mr./Ms. " + name

        add_prefix_udf = udf(add_prefix, StringType())

        # Apply the UDF
        df_with_prefix = df.withColumn("name_with_prefix", add_prefix_udf(col("name")))

        # Show result
        df_with_prefix.show()
    

6. Handling Missing Data

        # Sample data with missing values
        data = [(1, "Alice", 25), (2, "Bob", None), (3, "Cathy", 28)]
        df = spark.createDataFrame(data, ["id", "name", "age"])

        # Fill missing values in 'age' with a default value of 0
        filled_df = df.na.fill({"age": 0})

        # Show result
        filled_df.show()
    

7. Write Data to a CSV File

        # Sample data
        data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
        df = spark.createDataFrame(data, ["id", "name", "age"])

        # Write DataFrame to CSV
        df.write.csv("/path/to/output", header=True)
    

PySpark Read Write SQL

Introduction

In this guide, we will deep dive into how to manage data lakes on Databricks using SQL within PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

Loading Data into Data Lake

Let's start by loading data into a data lake using PySpark SQL queries.

spark.sql("SELECT * FROM '/mnt/data/sample.csv'").show()
    

Data Transformation

After loading data into the data lake, you can perform transformations using SQL queries in PySpark. For example, let's perform a group by operation.

spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category").show()
    

Writing Data Back to Data Lake

Once you have processed the data, you can write it back to your data lake in various formats such as Parquet using PySpark.

df_grouped.write.format("parquet").save("/mnt/data/output/")
    

PySpark SQL - Find Duplicate Records

This code finds duplicate records in the data based on a specific column using SQL commands in PySpark.

spark.sql("SELECT email, COUNT(email) FROM customers GROUP BY email HAVING COUNT(email) > 1").show()
    

PySpark SQL - Top Categories by Price

This code retrieves the top categories by the total price using SQL queries in PySpark.

spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category ORDER BY SUM(price) DESC LIMIT 10").show()
    


PySpark Read Write DataFrame

Introduction

In this guide, we will deep dive into how to manage data lakes on Databricks using PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.

Loading Data into Data Lake

Let's start by loading data into a data lake using PySpark on Databricks.

df = spark.read.format("csv").option("header", "true").load("/mnt/data/sample.csv")
df.show()
    

Data Transformation

After loading data into the data lake, you can perform transformations using PySpark. For example, let's perform a group by operation.

df_grouped = df.groupBy("category").sum("price")
df_grouped.show()
    

Writing Data Back to Data Lake

Once you have processed the data, you can write it back to your data lake in various formats such as Parquet or Delta.

df_grouped.write.format("parquet").save("/mnt/data/output/")
    

PySpark - Find Duplicate Records

This code finds duplicate records in the data based on a specific column.

df_duplicates = df.groupBy("email").count().filter("count > 1")
df_duplicates.show()
    

PySpark - Top Categories by Price

This code retrieves the top categories by the total price, similar to how you might use SQL's GROUP BY.

df_top_categories = df.groupBy("category").sum("price").orderBy("sum(price)", ascending=False)
df_top_categories.show(10)